package io.github.bigbird0101.datatransfer.collector;


import integration.modle.exception.IntegrationValidException;
import integration.utils.trace.LogUtils;
import io.github.bigbird0101.datatransfer.constants.PluginType;
import io.github.bigbird0101.datatransfer.record.Record;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public abstract class AbstractTaskPluginCollector extends TaskPluginCollector {


    private Communication communication;

    private String configuration;

    private PluginType pluginType;

    public AbstractTaskPluginCollector() {
    }

    public AbstractTaskPluginCollector(String conf, Communication communication,
                                       PluginType type) {
        this.configuration = conf;
        this.communication = communication;
        this.pluginType = type;
    }

    public Communication getCommunication() {
        return communication;
    }

    public String getConfiguration() {
        return configuration;
    }

    public PluginType getPluginType() {
        return pluginType;
    }

    @Override
    final public void collectMessage(String key, String value) {
        this.communication.addMessage(key, value);
    }

    @Override
    public void collectDirtyRecord(Record dirtyRecord, Throwable t,
                                   String errorMessage) {

        if (null == dirtyRecord) {
            LogUtils.warn(log, "脏数据record=null.");
            return;
        }

        if (this.pluginType.equals(PluginType.READER)) {
            this.communication.increaseCounter(
                    CommunicationTool.READ_FAILED_RECORDS, 1);
            this.communication.increaseCounter(
                    CommunicationTool.READ_FAILED_BYTES, dirtyRecord.getByteSize());
        } else if (this.pluginType.equals(PluginType.WRITER)) {
            this.communication.increaseCounter(
                    CommunicationTool.WRITE_FAILED_RECORDS, 1);
            this.communication.increaseCounter(
                    CommunicationTool.WRITE_FAILED_BYTES, dirtyRecord.getByteSize());
        } else {
            throw new IntegrationValidException(String.format("不知道的插件类型[%s].", this.pluginType));
        }
    }
}
